Skip to content

feat: multiple protections against leaky ws connections ws subscriber v2#2047

Open
LukasDeco wants to merge 1 commit intomasterfrom
lukas/ws-leaky-connections-protection
Open

feat: multiple protections against leaky ws connections ws subscriber v2#2047
LukasDeco wants to merge 1 commit intomasterfrom
lukas/ws-leaky-connections-protection

Conversation

@LukasDeco
Copy link
Copy Markdown
Collaborator

@LukasDeco LukasDeco commented Dec 5, 2025

Summary by CodeRabbit

Release Notes

  • New Features

    • Added diagnostic monitoring counters for connection and subscription tracking.
  • Bug Fixes

    • Improved oracle subscription reliability by preventing duplicate subscriptions.
    • Enhanced error recovery with automatic cleanup on subscription failures.
  • Performance

    • Optimized oracle state management to reduce race conditions and unnecessary updates.

@LukasDeco LukasDeco force-pushed the lukas/ws-leaky-connections-protection branch from f78a3a7 to b8b9a92 Compare March 3, 2026 22:52
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 3, 2026

Walkthrough

This change introduces diagnostic counters for socket lifecycle tracking, implements debounced state refreshers to prevent race conditions, adds in-flight promise tracking for oracle subscription deduplication, and orchestrates a reconciliation flow to ensure active oracle subscriptions align with current market references.

Changes

Cohort / File(s) Summary
Diagnostic Instrumentation
sdk/src/accounts/webSocketDriftClientAccountSubscriberV2.ts
Added static counters (totalSocketsOpened, currentActiveSockets, instancesCreated, subscribeCalls) and constructor logging; monkey-patched WebSocket lifecycle events to track socket state changes.
Oracle Subscription Deduplication
sdk/src/accounts/webSocketDriftClientAccountSubscriberV2.ts
Implemented oracleSubscribePromises map to reuse in-flight subscription promises per oracleId, preventing concurrent duplicate subscriptions and clearing entries after successful subscription storage.
Debounced State Management
sdk/src/accounts/webSocketDriftClientAccountSubscriberV2.ts
Added timeout fields (perpOracleMapRefreshTimeout, spotOracleMapRefreshTimeout, oracleReconcileTimeout) with corresponding scheduler methods to defer map mutations and reconciliation, reducing race conditions and getter side-effects.
Subscription Reconciliation
sdk/src/accounts/webSocketDriftClientAccountSubscriberV2.ts
Introduced reconcileOracleSubscribers() method that subscribes to missing required oracle subscriptions and unsubscribes from surplus ones; triggered automatically after oracle maps resolve via debounced scheduler.
Enhanced Getter Assertions
sdk/src/accounts/webSocketDriftClientAccountSubscriberV2.ts
Modified getOraclePriceDataAndSlotForPerpMarket and getOraclePriceDataAndSlotForSpotMarket to assert subscription state and schedule debounced map refreshes when oracle references change, instead of performing immediate mutations.
Failure Recovery
sdk/src/accounts/webSocketDriftClientAccountSubscriberV2.ts
Enhanced subscribe() with best-effort cleanup to unsubscribe from state, markets, and oracle subscriptions if the overall subscription flow fails.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐰 A rabbit hops through sockets spun,
No duplicate orbits—just clean runs!
With timers debounced and maps reconciled,
The oracle dance is now defiled
Of race conditions wild and free,
Our subscriptions flow in harmony! ✨

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main objective of the PR: adding protections against leaky WebSocket connections in the subscriber v2 component, which is clearly reflected in the changes (diagnostic counters, debounced refreshers, reconciliation flow, and cleanup logic).
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@sdk/src/accounts/webSocketDriftClientAccountSubscriberV2.ts`:
- Around line 452-459: The unsubscribeFromOracles() call only unsubscribes
sockets but doesn't clear the oracle bookkeeping maps, leaving stale entries in
oracleSubscribers and oracleSubscribePromises that make future
subscribeToOracles() skip subscriptions; update unsubscribeFromOracles() (or add
a clear step immediately after its call in the cleanup block) to also clear
oracleSubscribers and oracleSubscribePromises (e.g., call
oracleSubscribers.clear() and oracleSubscribePromises.clear() or reset them) so
state is fully cleaned and subscribeToOracles() can re-subscribe correctly.
- Around line 882-919: The scheduled async callbacks in
schedulePerpOracleMapRefresh, scheduleSpotOracleMapRefresh, and
scheduleOracleReconcile can run after unsubscribe() and re-open subscriptions;
change each scheduled handler to no-op if the instance is torn down by adding a
guard (e.g., check a boolean flag like this.isClosed or this.unsubscribed)
before calling setPerpOracleMap, setSpotOracleMap, or
reconcileOracleSubscribers, and ensure unsubscribe() sets that flag and clears
perpOracleMapRefreshTimeout, spotOracleMapRefreshTimeout, and
oracleReconcileTimeout; alternatively capture the timeout id in a local variable
and verify it still matches the corresponding this.*Timeout before invoking the
async work so addOracle/resubscribe cannot run after teardown.
- Around line 147-225: The code is monkey-patching private internals
(_connectWebsocket, _ws, _activeSubscriptions) of rpcSubscriptions to track WS
open/close which is brittle and swallows errors; replace this with
instrumentation that uses the public RPC subscription APIs (e.g.,
rpcSubscriptions.accountNotifications() or other public subscription methods) or
a documented transport/observer wrapper so you no longer touch private symbols.
Concretely, remove the monkey-patch logic (the PATCHED_FLAG branch and
attachLoggingToWs usage), and instead hook into the public AsyncIterator/event
interface returned by rpcSubscriptions.accountNotifications() (or the equivalent
public subscription factory) to increment/decrement
WebSocketDriftClientAccountSubscriberV2.totalSocketsOpened and
currentActiveSockets and emit the same log messages; ensure errors are surfaced
(no broad swallowing try/catch) and keep logging gated by
this.resubOpts?.logResubMessages.

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e93a87e and b8b9a92.

📒 Files selected for processing (1)
  • sdk/src/accounts/webSocketDriftClientAccountSubscriberV2.ts

Comment on lines +147 to +225
// Monkey-patch rpcSubscriptions internals for minimal WS open/close logging
try {
const subsAny = this.rpcSubscriptions as any;
const PATCHED_FLAG = '__driftWsPatched';

const attachLoggingToWs = (ws: any) => {
if (!ws || ws.__driftPatched) return;
ws.__driftPatched = true;
WebSocketDriftClientAccountSubscriberV2.totalSocketsOpened += 1;
WebSocketDriftClientAccountSubscriberV2.currentActiveSockets += 1;
const readyState = ws.readyState;
const activeSubs =
(subsAny?._activeSubscriptions && subsAny._activeSubscriptions.size) || 0;
if (this.resubOpts?.logResubMessages) {
console.log(
`[WS OPEN] readyState=${readyState} activeSubs=${activeSubs} totalOpened=${WebSocketDriftClientAccountSubscriberV2.totalSocketsOpened} currentActive=${WebSocketDriftClientAccountSubscriberV2.currentActiveSockets}`
);
}
const onClose = () => {
WebSocketDriftClientAccountSubscriberV2.currentActiveSockets = Math.max(
0,
WebSocketDriftClientAccountSubscriberV2.currentActiveSockets - 1
);
const rs = ws.readyState;
const act =
(subsAny?._activeSubscriptions &&
subsAny._activeSubscriptions.size) || 0;
if (this.resubOpts?.logResubMessages) {
console.log(
`[WS CLOSE] readyState=${rs} activeSubs=${act} totalOpened=${WebSocketDriftClientAccountSubscriberV2.totalSocketsOpened} currentActive=${WebSocketDriftClientAccountSubscriberV2.currentActiveSockets}`
);
}
};
// Support browser/node ws
if (typeof ws.addEventListener === 'function') {
ws.addEventListener('close', onClose);
}
if (typeof ws.on === 'function') {
ws.on('close', onClose);
}
};

if (!subsAny[PATCHED_FLAG]) {
subsAny[PATCHED_FLAG] = true;
const originalConnect = subsAny._connectWebsocket;
if (typeof originalConnect === 'function') {
subsAny._connectWebsocket = (...args: any[]) => {
const prevWs = subsAny._ws;
const result = originalConnect.apply(subsAny, args);
const hook = () => {
try {
const ws = subsAny._ws;
// Only treat as new if object identity changed or not patched yet
if (ws && ws !== prevWs) {
// may not be open yet; still attach listeners and count as opened
attachLoggingToWs(ws);
}
} catch (_e) {
// swallow
}
};
try {
if (result && typeof result.then === 'function') {
result.then(() => hook()).catch(() => hook());
} else {
queueMicrotask(hook);
}
} catch (_e) {
setTimeout(hook, 0);
}
return result;
};
}

// Fallback: if a WS already exists, attach once and treat as opened
if (subsAny._ws && !subsAny._ws.__driftPatched) {
attachLoggingToWs(subsAny._ws);
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

🧩 Analysis chain

🌐 Web query:

gill npm RpcSubscriptions _connectWebsocket _ws API documentation v0.10.2

💡 Result:

In gill v0.10.2, the rpcSubscriptions object you get from createSolanaClient() is not a Gill-specific RpcSubscriptions class with documented internals—it’s the Solana Kit (@solana/kit) RPC Subscriptions client that Gill wraps/re-exports. Gill’s docs describe creating rpcSubscriptions, but they don’t document private internals like _connectWebsocket or _ws. [1] [2]

What is documented / supported

  • Gill: create a client that includes rpcSubscriptions:
    • const { rpc, rpcSubscriptions } = createSolanaClient({ urlOrMoniker }) (Gill docs / README-level guidance). [1] [2]
  • Solana Kit: create subscriptions directly via:
    • createSolanaRpcSubscriptions('ws://...')
    • and consume subscriptions as AsyncIterators (for await...of). [3]

Where the subscriptions API is specified

  • The typed Subscriptions surface area (what methods exist and how subscription execution is shaped) is described by @solana/rpc-subscriptions-spec (and re-exported via Kit). [4]

About _connectWebsocket / _ws

Those names look like private implementation details (underscore-prefixed) and are not part of the public, stable API in the Kit/Gill docs—so there isn’t official “API documentation” for them; the supported approach is to use the exported creators (e.g. createSolanaRpcSubscriptions, or the transport/channel factory functions shown in Kit docs). [3]

Sources
[1] Gill API Reference (overview / install / exports): (gillsdk.com)
[2] Gill package overview (rpc + rpcSubscriptions via createSolanaClient): (socket.dev)
[3] Anza Kit docs (RPC Subscriptions, createSolanaRpcSubscriptions, AsyncIterator usage, transport customization): (github.com)
[4] @solana/rpc-subscriptions-spec (purpose + usage pattern around createSolanaRpcSubscriptions): (npmjs.com)

Citations:


🌐 Web query:

gill library websocket public API alternatives

💡 Result:

If you’re using gill (Solana JS/TS SDK) and want alternatives for WebSocket (PubSub) / “public API” access, here are the main options:

  1. Use Solana’s standard JSON-RPC PubSub WebSockets directly (no SDK required)
    Solana exposes WebSocket subscription methods like accountSubscribe, logsSubscribe, programSubscribe, etc. via the RPC PubSub endpoint. [1], [2]

  2. Use Anza’s @solana/kit / @solana/rpc-subscriptions (modern, low-level primitives)
    Provides a TypeScript interface over Solana RPC subscriptions and supports the standard Solana WebSocket methods. [3]

  3. Use a managed Solana WebSocket RPC provider (standard + value-add streams)

  • Helius Enhanced WebSockets (transaction/account streaming + filtering; their docs distinguish Enhanced vs Standard Solana WebSockets). [4], [5]
  • Surfpool WebSockets (documents Solana-style subscription endpoints like logsSubscribe, programSubscribe, blockSubscribe). [6]
  • SolanaTracker WebSockets (documents accountSubscribe over WSS with API key). [7]

Sources
[1] Solana RPC WebSocket methods overview: (solana.com)
[2] Example method detail (accountSubscribe): (solana.com)
[3] @solana/kit RPC subscriptions: (solanakit.com)
[4] Helius WebSockets/Webhooks product page: (helius.dev)
[5] Helius Enhanced WebSockets docs: (helius.dev)
[6] Surfpool WebSocket RPC docs: (docs.surfpool.run)
[7] SolanaTracker accountSubscribe docs: (docs.solanatracker.io)

Citations:


🏁 Script executed:

# Search for any references to gill in the codebase to understand version and usage
rg "gill" --type json,ts --max-count=20

Repository: drift-labs/protocol-v2

Length of output: 97


🏁 Script executed:

# Look for package.json to see gill dependency version
find . -name "package.json" -type f -exec grep -l "gill" {} \;

Repository: drift-labs/protocol-v2

Length of output: 84


🏁 Script executed:

# Check the gill version in package.json
cat ./sdk/package.json | grep -A 2 -B 2 "gill"

Repository: drift-labs/protocol-v2

Length of output: 204


🏁 Script executed:

# Look for any existing public API usage patterns for rpcSubscriptions in the codebase
rg "createSolanaRpc|rpcSubscriptions" sdk/src --type ts -B 2 -A 2 | head -40

Repository: drift-labs/protocol-v2

Length of output: 3218


Refactor to use documented public APIs instead of monkey-patching private internals.

Relying on _connectWebsocket, _ws, and _activeSubscriptions (lines 191, 198, 159) couples this code to undocumented private implementation details of @solana/kit's RPC Subscriptions client, which Gill wraps. These fields are not part of any stable API contract and can change without notice in dependency updates. The broad try/catch (lines 209, 216, 222, 224) will silently swallow breaking changes, making failures difficult to diagnose.

Instead, use the public subscription methods (rpcSubscriptions.accountNotifications(), etc.) and their AsyncIterator interface, or create a custom transport/observer wrapper via the documented Solana Kit patterns for instrumentation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/src/accounts/webSocketDriftClientAccountSubscriberV2.ts` around lines 147
- 225, The code is monkey-patching private internals (_connectWebsocket, _ws,
_activeSubscriptions) of rpcSubscriptions to track WS open/close which is
brittle and swallows errors; replace this with instrumentation that uses the
public RPC subscription APIs (e.g., rpcSubscriptions.accountNotifications() or
other public subscription methods) or a documented transport/observer wrapper so
you no longer touch private symbols. Concretely, remove the monkey-patch logic
(the PATCHED_FLAG branch and attachLoggingToWs usage), and instead hook into the
public AsyncIterator/event interface returned by
rpcSubscriptions.accountNotifications() (or the equivalent public subscription
factory) to increment/decrement
WebSocketDriftClientAccountSubscriberV2.totalSocketsOpened and
currentActiveSockets and emit the same log messages; ensure errors are surfaced
(no broad swallowing try/catch) and keep logging gated by
this.resubOpts?.logResubMessages.

Comment on lines +452 to +459
// Best-effort cleanup of any partially created subscriptions to avoid leaks
try {
await Promise.all([
this.stateAccountSubscriber?.unsubscribe(),
this.perpMarketAllAccountsSubscriber?.unsubscribe(),
this.spotMarketAllAccountsSubscriber?.unsubscribe(),
this.unsubscribeFromOracles(),
]);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Failure cleanup unsubscribes sockets but leaves stale oracle bookkeeping maps populated.

Line 458 cleanup calls unsubscribeFromOracles(), but there’s no map clear here; stale entries in oracleSubscribers / oracleSubscribePromises can cause future subscribeToOracles() to skip required subscriptions.

🔧 Proposed fix
 async unsubscribeFromOracles(): Promise<void> {
-  await Promise.all(
-    Array.from(this.oracleSubscribers.values()).map((accountSubscriber) =>
-      accountSubscriber.unsubscribe()
-    )
-  );
+  const subscribers = Array.from(this.oracleSubscribers.values());
+  await Promise.all(subscribers.map((accountSubscriber) => accountSubscriber.unsubscribe()));
+  this.oracleSubscribers.clear();
+  this.oracleSubscribePromises.clear();
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/src/accounts/webSocketDriftClientAccountSubscriberV2.ts` around lines 452
- 459, The unsubscribeFromOracles() call only unsubscribes sockets but doesn't
clear the oracle bookkeeping maps, leaving stale entries in oracleSubscribers
and oracleSubscribePromises that make future subscribeToOracles() skip
subscriptions; update unsubscribeFromOracles() (or add a clear step immediately
after its call in the cleanup block) to also clear oracleSubscribers and
oracleSubscribePromises (e.g., call oracleSubscribers.clear() and
oracleSubscribePromises.clear() or reset them) so state is fully cleaned and
subscribeToOracles() can re-subscribe correctly.

Comment on lines +882 to +919
private schedulePerpOracleMapRefresh(): void {
if (this.perpOracleMapRefreshTimeout) {
clearTimeout(this.perpOracleMapRefreshTimeout);
}
this.perpOracleMapRefreshTimeout = setTimeout(async () => {
try {
await this.setPerpOracleMap();
} catch (_e) {
// swallow
}
}, 50);
}

private scheduleSpotOracleMapRefresh(): void {
if (this.spotOracleMapRefreshTimeout) {
clearTimeout(this.spotOracleMapRefreshTimeout);
}
this.spotOracleMapRefreshTimeout = setTimeout(async () => {
try {
await this.setSpotOracleMap();
} catch (_e) {
// swallow
}
}, 50);
}

private scheduleOracleReconcile(): void {
if (this.oracleReconcileTimeout) {
clearTimeout(this.oracleReconcileTimeout);
}
this.oracleReconcileTimeout = setTimeout(async () => {
try {
await this.reconcileOracleSubscribers();
} catch (_e) {
// swallow
}
}, 75);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Debounced callbacks can run after unsubscribe and reopen oracle subscriptions (leak risk).

Line 886, Line 899, and Line 912 schedule async work that can still fire after unsubscribe(). Those callbacks call set*OracleMap()/reconcileOracleSubscribers(), which can call addOracle() and resubscribe while the instance is supposed to be torn down.

🔧 Proposed fix
 private schedulePerpOracleMapRefresh(): void {
+  if (!this.isSubscribed) return;
   if (this.perpOracleMapRefreshTimeout) {
     clearTimeout(this.perpOracleMapRefreshTimeout);
   }
   this.perpOracleMapRefreshTimeout = setTimeout(async () => {
+    if (!this.isSubscribed) return;
     try {
       await this.setPerpOracleMap();
     } catch (_e) {
       // swallow
+    } finally {
+      this.perpOracleMapRefreshTimeout = undefined;
     }
   }, 50);
 }

 private scheduleSpotOracleMapRefresh(): void {
+  if (!this.isSubscribed) return;
   if (this.spotOracleMapRefreshTimeout) {
     clearTimeout(this.spotOracleMapRefreshTimeout);
   }
   this.spotOracleMapRefreshTimeout = setTimeout(async () => {
+    if (!this.isSubscribed) return;
     try {
       await this.setSpotOracleMap();
     } catch (_e) {
       // swallow
+    } finally {
+      this.spotOracleMapRefreshTimeout = undefined;
     }
   }, 50);
 }

 private scheduleOracleReconcile(): void {
+  if (!this.isSubscribed) return;
   if (this.oracleReconcileTimeout) {
     clearTimeout(this.oracleReconcileTimeout);
   }
   this.oracleReconcileTimeout = setTimeout(async () => {
+    if (!this.isSubscribed) return;
     try {
       await this.reconcileOracleSubscribers();
     } catch (_e) {
       // swallow
+    } finally {
+      this.oracleReconcileTimeout = undefined;
     }
   }, 75);
 }
 public async unsubscribe(): Promise<void> {
   if (!this.isSubscribed) {
     return;
   }

+  if (this.perpOracleMapRefreshTimeout) clearTimeout(this.perpOracleMapRefreshTimeout);
+  if (this.spotOracleMapRefreshTimeout) clearTimeout(this.spotOracleMapRefreshTimeout);
+  if (this.oracleReconcileTimeout) clearTimeout(this.oracleReconcileTimeout);
+  this.perpOracleMapRefreshTimeout = undefined;
+  this.spotOracleMapRefreshTimeout = undefined;
+  this.oracleReconcileTimeout = undefined;
+
   if (this.subscriptionPromise) {
     await this.subscriptionPromise;
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/src/accounts/webSocketDriftClientAccountSubscriberV2.ts` around lines 882
- 919, The scheduled async callbacks in schedulePerpOracleMapRefresh,
scheduleSpotOracleMapRefresh, and scheduleOracleReconcile can run after
unsubscribe() and re-open subscriptions; change each scheduled handler to no-op
if the instance is torn down by adding a guard (e.g., check a boolean flag like
this.isClosed or this.unsubscribed) before calling setPerpOracleMap,
setSpotOracleMap, or reconcileOracleSubscribers, and ensure unsubscribe() sets
that flag and clears perpOracleMapRefreshTimeout, spotOracleMapRefreshTimeout,
and oracleReconcileTimeout; alternatively capture the timeout id in a local
variable and verify it still matches the corresponding this.*Timeout before
invoking the async work so addOracle/resubscribe cannot run after teardown.

@aleph-labs-hq
Copy link
Copy Markdown

[vulture-outreach-20260308]
Hi @LukasDeco - on ws subscriber v2 hardening, I can provide a 24h integrator migration note for external operators:

  1. connectivity/reconnect behavior checklist
  2. monitoring + alert gates for rollout
  3. fallback plan for noisy/disconnected feeds

Can share a concise one-page format first.

@aleph-labs-hq
Copy link
Copy Markdown

[vulture-outreach-20260308]
Quick follow-up in case useful for release timing.

I can still deliver a compact 24h integration brief:

  1. change-impact map
  2. regression checklist
  3. go/no-go rollback gates

If useful, I can start immediately.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants